Conversation
fix: default task tracker to memory backend
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
MaojiaSheng
approved these changes
May 9, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
OpenViking Task Tracker 持久化设计与当前实现说明
1. 背景
OpenViking 的异步任务跟踪由
TaskTracker提供,典型任务包括:session_commitadd_resourceadd_skilladmin_reindex这些接口会返回
task_id,调用方再通过下面两个接口查询状态:GET /api/v1/tasks/{task_id}GET /api/v1/tasks原始实现只有进程内内存映射:
task_id -> TaskRecord这会带来两个问题:
task_id,请求打到另一个实例就查不到。当前实现已经把
TaskTracker改造成“接口基本不变、存储后端可插拔”的结构,同时支持:memorypersistent其中
persistent方案可以满足:task_idtask_id2. 设计目标
当前实现的目标如下:
TaskTracker的核心对外能力。memory和persistent后端之间切换。persistent模式下,任务状态对多实例可见。task_id + account_id + user_id继续更新同一任务。3. 非目标
当前实现明确不处理下面这些复杂问题:
failed。4. 当前总体架构
4.1 组件
当前实现涉及三个核心部分:
TaskTrackeropenviking/service/task_tracker.pyTaskStoreopenviking/service/task_store.pyStorageConfig.build_task_tracker()openviking_cli/utils/config/storage_config.pyTaskTracker4.2 存储后端
TaskStore当前有两种实现:InMemoryTaskStorePersistentTaskStoreTaskTracker内部既保留:_tasks: Dict[str, TaskRecord]也持有:
self._store其中:
memory模式下,self._store是InMemoryTaskStorepersistent模式下,self._store是PersistentTaskStore5. 配置方式
当前配置项如下:
{ "storage": { "task_tracker": { "backend": "memory" } } }支持值:
memorypersistent5.1 默认值
当前默认值是:
{ "storage": { "task_tracker": { "backend": "memory" } } }也就是说:
persistent5.2 构建逻辑
当前构建逻辑是:
backend == "memory"->TaskTracker()backend == "persistent"->TaskTracker(store=PersistentTaskStore(agfs))6. 数据模型
6.1 TaskRecord
当前
TaskRecord字段如下:说明:
account_id/user_id是任务归属信息。TaskRecord.to_dict()对外返回时会去掉account_id/user_id。schema_version字段。6.2 持久化 JSON 结构
持久化文件内部直接存
TaskRecord的 JSON 表达,示例如下:{ "task_id": "5f0ec6f0-9e69-4d32-8dc7-2e5f4c2efabc", "task_type": "add_resource", "status": "completed", "created_at": 1778300000.123, "updated_at": 1778300002.456, "resource_id": "viking://resources/demo", "account_id": "acme", "user_id": "alice", "result": { "root_uri": "viking://resources/demo" }, "error": null }7. 持久化路径设计
7.1 当前物理路径
当前
PersistentTaskStore的落盘路径是:例如:
7.2 这样设计的原因
当前接口查询语义是按
account_id + user_id过滤,因此路径上也带上user_id,这样更一致:account_id负责租户级隔离user_id负责租户内用户级分桶task_id是该用户任务目录下的唯一键相较于旧版的:
当前路径更符合现有接口行为。
7.3 目录层次
当前目录结构含义如下:
7.4 保留目录约束
tasks被视为内部保留目录,不应被当作普通业务资源使用。当前实现里:
openviking/storage/viking_fs.py已经将
tasks视为内部保留名称的一部分,用于避免被普通资源视图误用。8. TaskStore 抽象
8.1 当前接口
当前
TaskStore接口如下:create(task)update(task)get(task_id, account_id=None, user_id=None)list(account_id, user_id=None)delete(task_id, account_id, user_id=None)8.2 InMemoryTaskStore
InMemoryTaskStore的特点:account_id/user_id过滤TaskTracker的 TTL 清理删除内存记录8.3 PersistentTaskStore
PersistentTaskStore的特点:/local/...create/update都是覆盖写目标 JSON 文件get必须提供:account_iduser_idlist当前只支持按单个 user 目录扫描delete当前也要求提供account_id + user_id8.4 目录创建行为
PersistentTaskStore在写入前会确保目录存在:/local/{account_id}/local/{account_id}/tasks/local/{account_id}/tasks/{user_id}当前实现对
mkdir already exists做了幂等处理,避免重复创建时报错。9. TaskTracker 当前行为
9.1 创建任务
创建任务时:
account_iduser_id_tasksself._store.create(task)适用接口:
create()create_if_no_running()9.2 更新任务
当前以下接口都支持显式传入:
start(task_id, account_id=None, user_id=None)complete(task_id, result=None, account_id=None, user_id=None)fail(task_id, error, account_id=None, user_id=None)行为分两种:
如果当前实例内存里已经有这个
task_idself._store如果当前实例内存里没有这个
task_idaccount_id + user_id时这就是“跨实例 / 重启后继续更新同一个 task”的基础。
9.3 查询任务
get(task_id, account_id=None, user_id=None)行为:_tasksaccount_id_matches_owner()做account_id + user_id过滤9.4 列表与查重
list_tasks()和has_running()在带account_id的情况下,会先尝试从 store 加载该范围内任务,再做过滤。当前在
persistent模式下,真正的加载粒度已经是:account_id + user_id因为
PersistentTaskStore.list()当前只按单 user 目录列举。10. HTTP 查询语义
10.1 当前
/api/v1/tasks语义/api/v1/tasks当前不是只按account过滤,而是同时按:account_iduser_id过滤返回。
路由位于:
openviking/server/routers/tasks.py10.2 当前
/api/v1/tasks/{task_id}语义单任务查询同样按:
account_iduser_id限制可见性。
也就是说:
11. 后续异步阶段如何更新同一个 task_id
11.1 当前要求
如果任务后续更新有可能发生在:
那么后续更新代码必须显式携带:
task_idaccount_iduser_id11.2 原因
因为当前持久化路径是:
如果没有
user_id,就无法从持久化层准确定位到文件。11.3 当前已经补齐的调用链
当前实现里,下面这些异步更新链路都已经显式传递
account_id + user_id:session.commit_async()后续阶段ResourceService.add_resource()队列监控阶段ResourceService.add_skill()队列监控阶段ReindexExecutor._run_tracked()12. 清理语义
12.1 memory 模式
memory模式保留原始的 TTL 清理行为:COMPLETED保留 24 小时FAILED保留 7 天MAX_TASKS=10000时做 FIFO 淘汰12.2 persistent 模式
当前一期实现里:
TaskTracker仍然会清理当前进程内_tasks缓存也就是说:
这是当前实现有意保留的简化策略。
13. 当前实现的优点
TaskTracker主接口仍然可用。memory和persistent可切换,便于渐进上线。persistent模式能满足最基本的跨实例查询诉求。account_id + user_id分桶与当前 HTTP 查询语义一致。task_idaccount_iduser_id就能继续更新同一任务。
14. 当前实现的限制
failed。PersistentTaskStore.list()当前只支持 user 级列举,不支持 account 级聚合枚举。start/complete/fail在跨实例场景下必须显式传user_id,否则无法命中持久化路径。15. 当前实现总结
当前代码已经落地了一个“一期可用”的持久化 task tracker:
memorypersistent后,可通过:/local/{account_id}/tasks/{user_id}/{task_id}.json提供跨实例查询基础
TaskTracker对外能力保留TaskStore负责存储抽象account_id + user_id这套实现已经满足最核心的集群诉求:
task_idtask_id但它仍然是一个偏简单的一期实现,未来如果需要更完善的运维能力,还可以继续补: